---
sidebar_position: 2
---

import Mermaid from '@theme/Mermaid';

# Normalize a flat structure

In traditional ETL, normalizing flat inputs (meaning dispatching one row into several tables, with one table that makes reference to the other one.s) is typically a duty that is far to be straight forward. This is extremely surprising to say the least, as this is one of the purposes of the "T" (Transform) in the acronym "ETL".

ETL.NET has a very clear and straight forward out of the box tools to handle this **very** usual pattern when it is about to import flat structures such as files. This mechanism is so straight forward that it seems almost ***magical***!

## Principle

This normalization is possible thanks to two features:

- A way to know the original row/rows that are at the source of the current row
- The capacity to make a Select/Update/Insert for each row to save with `Paillave.EtlNet.EntityFrameworkCore` or `Paillave.EtlNet.SqlServer`

The normalization pattern lie on the possibility to know what was the initial row or rows that permitted to result to a payload.

Behind the scenes, each row has a list of unique identifiers linked to it, and each operator is supposed to know how to combine these uids depending on its purpose. When a stream is issuing payloads that have a list of unique identifiers under the hood, it is called a **correlated stream**.
Out of the box, streams are not correlated for performance and memory purpose. For a stream to be correlated, it must pass through a dedicated operator using `SetForCorrelation`. This operator simply attributes a list that contains a different single unique identifier to each row. Every operator of the core of ETL.NET knows how to handle correlated stream and issues a proper correlated stream regarding to its logic.

For example, let's imagine this sequence of events:

| Value1 | Value2 |
| - | - |
| 1 | z |
| 2 | z |
| 3 | d |
| 4 | d |
| 5 | e |
| 6 | d |
| 7 | d |
| 8 | e |

Correlating this stream using `SetForCorrelation` would change it this way:

| Value1 | Value2 | Correlation ids |
| - | - | - |
| 1 | z | `dafb56a0-7bfd-482f-8ed2-e47ded1abfe3` |
| 2 | z | `851267eb-50e0-47f9-b988-e17e75a458d2` |
| 3 | d | `8af7348a-c004-4c4d-90db-f6fa5213cabb` |
| 4 | d | `4b1ac39f-89c0-426b-a1e2-07c63aebf938` |
| 5 | e | `c6781c10-0f6b-4668-97e6-2788627aa7a4` |
| 6 | d | `338aabe0-3266-4c2d-9b0e-770b0c0b14dd` |
| 7 | d | `c131bb0b-97fa-4c63-86a5-ebcbeb0800f6` |
| 8 | e | `a1f13dfc-f9e4-46af-9e7a-1e64271f1691` |

Making a `Distinct` based on `Value2` would give this stream as a result:

| Value1 | Value2 | Correlation ids |
| - | - | - |
| 1 | z | `dafb56a0-7bfd-482f-8ed2-e47ded1abfe3`, `851267eb-50e0-47f9-b988-e17e75a458d2` |
| 3 | d | `8af7348a-c004-4c4d-90db-f6fa5213cabb`, `4b1ac39f-89c0-426b-a1e2-07c63aebf938`, `338aabe0-3266-4c2d-9b0e-770b0c0b14dd`, `c131bb0b-97fa-4c63-86a5-ebcbeb0800f6` |
| 5 | e | `c6781c10-0f6b-4668-97e6-2788627aa7a4`, `a1f13dfc-f9e4-46af-9e7a-1e64271f1691` |

Then, let's save payloads of this stream based on the business key (`Value2`). As the save system gets the Id wether an occurrence already exists in the database or not, we get this result:

| Value2 | Id | Correlation ids |
| - | - | - |
| z | 45 | `dafb56a0-7bfd-482f-8ed2-e47ded1abfe3`, `851267eb-50e0-47f9-b988-e17e75a458d2` |
| d | 69 | `8af7348a-c004-4c4d-90db-f6fa5213cabb`, `4b1ac39f-89c0-426b-a1e2-07c63aebf938`, `338aabe0-3266-4c2d-9b0e-770b0c0b14dd`, `c131bb0b-97fa-4c63-86a5-ebcbeb0800f6` |
| e | 13 | `c6781c10-0f6b-4668-97e6-2788627aa7a4`, `a1f13dfc-f9e4-46af-9e7a-1e64271f1691` |

Now, for each event of the first correlated stream, we can get the related event from our last stream using the operator `CorrelateToSingle`. This operator, for each event of the first stream, tries to find the first event of the second stream that contains a correlation key that exists in the current correlation key list. The result is the following:

| Value1 | Value2 | Value2Id | Correlation ids |
| - | - | - | - |
| 1 | z | 45 | `dafb56a0-7bfd-482f-8ed2-e47ded1abfe3` |
| 2 | z | 45 | `851267eb-50e0-47f9-b988-e17e75a458d2` |
| 3 | d | 69 | `8af7348a-c004-4c4d-90db-f6fa5213cabb` |
| 4 | d | 69 | `4b1ac39f-89c0-426b-a1e2-07c63aebf938` |
| 5 | e | 13 | `c6781c10-0f6b-4668-97e6-2788627aa7a4` |
| 6 | d | 69 | `338aabe0-3266-4c2d-9b0e-770b0c0b14dd` |
| 7 | d | 69 | `c131bb0b-97fa-4c63-86a5-ebcbeb0800f6` |
| 8 | e | 13 | `a1f13dfc-f9e4-46af-9e7a-1e64271f1691` |

And now, this can be saved in the database as well by making reference to the foreign key of rows that were previously saved.

## Practically

Below, the structure of the file to import:

| column name | Description |
| - | - |
| title | Title of the blog post |
| author | Author name of the post |
| email | Email of the post author |
| timestamp | Date and time when the post was submitted |
| category | Category of the post |
| link | Only for post with a link: url of the link |
| post | Only for a post with a text: text of the post |

The file would look like the following:

```csv title="post.csv"
title,author,email,timestamp,category,link,post
FundProcess features,Stéphane Royer,stephane.royer@fundprocess.lu,20210109113005,Category 2,https://www.fundprocess.lu/features/,
ETL.NET revealed,Paillave,admroyer@hotmail.com,20210508181126,Category 2,,"This a post, about ETL.NET"
ETL.NET page,Paillave,admroyer@hotmail.com,20210504164510,Category 1,https://paillave.github.io/Etl.Net/,
FundProcess presentation,Stéphane Royer,stephane.royer@fundprocess.lu,20210203124051,Category 2,,"This a ""post"", about FundProcess"
FundProcess website,Stéphane Royer,stephane.royer@fundprocess.lu,20210106103005,Category 1,http://www.fundprocess.lu,
ETL.NET nuget,Paillave,admroyer@hotmail.com,20200504164510,Category 1,http://www.nuget.org/packages/Etl.Net,
ETL.NET information,Paillave,admroyer@hotmail.com,20200518071024,Category 3,,"This ""another post"" about ETL.NET"
FundProcess information,Stéphane Royer,stephane.royer@fundprocess.lu,20210819124550,Category 1,,This another post about FundProcess
```

The normalized structure where this file must be imported is this one:

<Mermaid chart={`
classDiagram
class Author {
    Id:int
    Email:string
    Name:string
}
class Category {
    Id:int
    Code:string
    Name:string
}
class Post {
    <<abstract>>
    Id:int
    DateTime:DateTime
    Title:string
    AuthorId:int
    CategoryId:int?
}
class LinkPost {
    Url:Uri
}
class TextPost {
    Text:string
}
Post --> Author
Post --> Category
LinkPost --|> Post
TextPost --|> Post`}/>

The corresponding Entity Framework DbContext is this one:

```cs title="SimpleDbContext.cs"
using System;
using Microsoft.EntityFrameworkCore;

namespace BlogTutorial.DataAccess
{
    public class SimpleDbContext : DbContext
    {
        private readonly string _connectionString = null;
        public SimpleTutorialDbContext() { }
        public SimpleTutorialDbContext(string connectionString) => _connectionString = connectionString;
        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(_connectionString ?? @"Server=localhost,1433;Database=BlogTutorial;user=BlogTutorial;password=TestEtl.TestEtl;MultipleActiveResultSets=True");
        }
        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            var authorBuilder = modelBuilder.Entity<Author>();
            authorBuilder.ToTable(nameof(Author));
            authorBuilder.HasKey(i => i.Id);
            authorBuilder.HasIndex(i => i.Email).IsUnique();
            authorBuilder.Property(i => i.Id).UseIdentityColumn();
            authorBuilder.Property(i => i.Name).IsRequired();
            authorBuilder.Property(i => i.Email).HasMaxLength(250).IsRequired();

            var categoryBuilder = modelBuilder.Entity<Category>();
            categoryBuilder.ToTable(nameof(Category));
            categoryBuilder.HasKey(i => i.Id);
            categoryBuilder.HasIndex(i => i.Code).IsUnique();
            categoryBuilder.Property(i => i.Id).UseIdentityColumn();
            categoryBuilder.Property(i => i.Name).IsRequired();
            categoryBuilder.Property(i => i.Code).IsRequired().HasMaxLength(20);

            var postBuilder = modelBuilder.Entity<Post>();
            postBuilder.ToTable(nameof(Post));
            postBuilder.HasKey(i => i.Id);
            postBuilder.HasIndex(i => new { i.AuthorId, i.DateTime }).IsUnique();
            postBuilder.Property(i => i.Id).UseIdentityColumn();
            postBuilder.HasOne(i => i.Author).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.AuthorId);
            postBuilder.HasOne(i => i.Category).WithMany().OnDelete(DeleteBehavior.Restrict).HasForeignKey(i => i.CategoryId);

            var linkPostBuilder = modelBuilder.Entity<LinkPost>();
            linkPostBuilder.HasBaseType<Post>();
            linkPostBuilder.Property(i => i.Url).IsRequired().HasConversion(
                uri => uri == null ? null : uri.ToString(),
                value => string.IsNullOrWhiteSpace(value) ? null : new Uri(value));

            var textPostBuilder = modelBuilder.Entity<TextPost>();
            textPostBuilder.HasBaseType<Post>();
            textPostBuilder.Property(i => i.Text).IsRequired();

            var executionLogBuilder = modelBuilder.Entity<ExecutionLog>();
            executionLogBuilder.ToTable(nameof(ExecutionLog));
            executionLogBuilder.HasKey(i => i.Id);
            executionLogBuilder.Property(i => i.Id).UseIdentityColumn();
            executionLogBuilder.Property(i => i.EventType).HasMaxLength(250).IsRequired();
            executionLogBuilder.Property(i => i.Message).IsRequired();
        }
    }
    public class Author
    {
        public int Id { get; set; }
        public string Email { get; set; }
        public string Name { get; set; }
    }
    public class Category
    {
        public int Id { get; set; }
        public string Code { get; set; }
        public string Name { get; set; }
    }
    public abstract class Post
    {
        public int Id { get; set; }
        public DateTime DateTime { get; set; }
        public string Title { get; set; }
        public int AuthorId { get; set; }
        public Author Author { get; set; }
        public int? CategoryId { get; set; }
        public Category Category { get; set; }
    }
    public class LinkPost : Post
    {
        public Uri Url { get; set; }
    }
    public class TextPost : Post
    {
        public string Text { get; set; }
    }
    public class ExecutionLog
    {
        public int Id { get; set; }
        public DateTime DateTime { get; set; }
        public Guid ExecutionId { get; set; }
        public string EventType { get; set; }
        public string Message { get; set; }
    }
}
```

Now, let's apply the theory in this practical situation:

```cs {15,28,29}
private static void DefineProcess(ISingleStream<string> contextStream)
{
    var rowStream = contextStream
        .CrossApplyFolderFiles("list all required files", "*.csv", true)
        .CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
        {
            Author = i.ToColumn("author"),
            Email = i.ToColumn("email"),
            TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
            Category = i.ToColumn("category"),
            Link = i.ToColumn("link"),
            Post = i.ToColumn("post"),
            Title = i.ToColumn("title"),
        }).IsColumnSeparated(','))
        .SetForCorrelation("set correlation for row");

    var authorStream = rowStream
        .Distinct("remove author duplicates based on emails", i => i.Email)
        .Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
        .EfCoreSave("save authors", o => o.SeekOn(i => i.Email).AlternativelySeekOn(i => i.Name));

    var categoryStream = rowStream
        .Distinct("remove category duplicates", i => i.Category)
        .Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
        .EfCoreSave("save categories", o => o.SeekOn(i => i.Code).DoNotUpdateIfExists());

    var postStream = rowStream
        .CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
        .CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
        .Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
            ? new LinkPost
            {
                AuthorId = i.Author.Id,
                CategoryId = i.Category.Id,
                DateTime = i.Row.TimeSpan,
                Title = i.Row.Title,
                Url = new Uri(i.Row.Link)
            } as Post
            : new TextPost
            {
                AuthorId = i.Author.Id,
                CategoryId = i.Category.Id,
                DateTime = i.Row.TimeSpan,
                Title = i.Row.Title,
                Text = i.Row.Post
            })
        .EfCoreSave("save posts", o => o.SeekOn(i => new { i.AuthorId, i.DateTime }));
}
```

:::note

It can happen sometimes that, depending on the situation, C# compiler cannot resolve properly which version of `EfCoreSave` to use and takes the one for non correlated streams. If so, use `EfCoreSaveCorrelated` instead to prevent the compiler to be confused.

:::